// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements.  See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License.  You may obtain a copy of the License at
//
//    http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package jobopts contains shared options for job submission. These options
// are exposed to allow user code to inspect and modify them.
package jobopts

import (
	"context"
	"flag"
	"fmt"
	"os"
	"strings"
	"time"

	"sync/atomic"

	"github.com/apache/beam/sdks/go/pkg/beam/internal/errors"
	"github.com/apache/beam/sdks/go/pkg/beam/log"
)

var (
	// Endpoint is the job service endpoint.
	Endpoint = flag.String("endpoint", "", "Job service endpoint (required).")

	// JobName is the name of the job.
	JobName = flag.String("job_name", "", "Job name (optional).")

	// EnvironmentType is the environment type to run the user code.
	EnvironmentType = flag.String("environment_type", "DOCKER",
		"Environment Type. Possible options are DOCKER and PROCESS.")

	// EnvironmentConfig is the environment configuration for running the user code.
	EnvironmentConfig = flag.String("environment_config",
		"",
		"Set environment configuration for running the user code.\n"+
			"For DOCKER: Url for the docker image.\n"+
			"For PROCESS: json of the form {\"os\": \"<OS>\", "+
			"\"arch\": \"<ARCHITECTURE>\", \"command\": \"<process to execute>\", "+
			"\"env\":{\"<Environment variables 1>\": \"<ENV_VAL>\"} }. "+
			"All fields in the json are optional except command.")

	// WorkerBinary is the location of the compiled worker binary. If not
	// specified, the binary is produced via go build.
	WorkerBinary = flag.String("worker_binary", "", "Worker binary (optional)")

	// Experiments toggle experimental features in the runner.
	Experiments = flag.String("experiments", "", "Comma-separated list of experiments (optional).")

	// Async determines whether to wait for job completion.
	Async = flag.Bool("async", false, "Do not wait for job completion.")

	// Strict mode applies additional validation to user pipelines before
	// executing them and fails early if the pipelines don't pass.
	Strict = flag.Bool("beam_strict", false, "Apply additional validation to pipelines.")
)

// GetEndpoint returns the endpoint, if non empty and exits otherwise. Runners
// such as Dataflow set a reasonable default. Convenience function.
func GetEndpoint() (string, error) {
	if *Endpoint == "" {
		return "", errors.New("no job service endpoint specified. Use --endpoint=<endpoint>")
	}
	return *Endpoint, nil
}

var unique int32

// GetJobName returns the specified job name or, if not present, a fresh
// autogenerated name. Convenience function.
func GetJobName() string {
	if *JobName == "" {
		id := atomic.AddInt32(&unique, 1)
		return fmt.Sprintf("go-job-%v-%v", id, time.Now().UnixNano())
	}
	return *JobName
}

// GetEnvironmentUrn returns the specified EnvironmentUrn used to run the SDK Harness,
// if not present, returns the docker environment urn "beam:env:docker:v1".
// Convenience function.
func GetEnvironmentUrn(ctx context.Context) string {
	switch env := strings.ToLower(*EnvironmentType); env {
	case "process":
		return "beam:env:process:v1"
	case "docker":
		return "beam:env:docker:v1"
	default:
		log.Infof(ctx, "No environment type specified. Using default environment: '%v'", *EnvironmentType)
		return "beam:env:docker:v1"
	}
}

// GetEnvironmentConfig returns the specified configuration for specified SDK Harness,
// if not present, the default development container for the current user.
// Convenience function.
func GetEnvironmentConfig(ctx context.Context) string {
	if *EnvironmentConfig == "" {
		*EnvironmentConfig = os.ExpandEnv("apachebeam/go_sdk:latest")
		log.Infof(ctx, "No environment config specified. Using default config: '%v'", *EnvironmentConfig)
	}
	return *EnvironmentConfig
}

// GetExperiments returns the experiments.
func GetExperiments() []string {
	if *Experiments == "" {
		return nil
	}
	return strings.Split(*Experiments, ",")
}
